package org.apache.hadoop.hdfs.tools;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.ZKFailoverController;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

import com.google.protobuf.InvalidProtocolBufferException;

/**
 * DFSZKFailoverController是Hadoop-2.7.0中HDFS NameNode HA实现的中心组件，它负责整体的故障转移控制等。
 * 它是一个守护进程，通过main()方法启动，继承自ZKFailoverController。
 */
@InterfaceAudience.Private
public class DFSZKFailoverController extends ZKFailoverController {

  private static final Log LOG = LogFactory.getLog(DFSZKFailoverController.class);
  private final AccessControlList adminAcl;
  /* the same as superclass's localTarget, but with the more specfic NN type */
  private final NNHAServiceTarget localNNTarget;

  @Override
  protected HAServiceTarget dataToTarget(byte[] data) {
    ActiveNodeInfo proto;
    try {
      proto = ActiveNodeInfo.parseFrom(data);
    } catch (InvalidProtocolBufferException e) {
      throw new RuntimeException("Invalid data in ZK: " +
          StringUtils.byteToHexString(data));
    }
    NNHAServiceTarget ret = new NNHAServiceTarget(
        conf, proto.getNameserviceId(), proto.getNamenodeId());
    InetSocketAddress addressFromProtobuf = new InetSocketAddress(
        proto.getHostname(), proto.getPort());
    
    if (!addressFromProtobuf.equals(ret.getAddress())) {
      throw new RuntimeException("Mismatched address stored in ZK for " +
          ret + ": Stored protobuf was " + proto + ", address from our own " +
          "configuration for this NameNode was " + ret.getAddress());
    }
    
    ret.setZkfcPort(proto.getZkfcPort());
    return ret;
  }

  @Override
  protected byte[] targetToData(HAServiceTarget target) {
    InetSocketAddress addr = target.getAddress();

    return ActiveNodeInfo.newBuilder()
      .setHostname(addr.getHostName())
      .setPort(addr.getPort())
      .setZkfcPort(target.getZKFCAddress().getPort())
      .setNameserviceId(localNNTarget.getNameServiceId())
      .setNamenodeId(localNNTarget.getNameNodeId())
      .build()
      .toByteArray();
  }
  
  @Override
  protected InetSocketAddress getRpcAddressToBindTo() {
    int zkfcPort = getZkfcPort(conf);
    return new InetSocketAddress(localTarget.getAddress().getAddress(),
          zkfcPort);
  }
  

  @Override
  protected PolicyProvider getPolicyProvider() {
    return new HDFSPolicyProvider();
  }
  
  static int getZkfcPort(Configuration conf) {
    return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
        DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
  }

  /**
   * 主要就是构造DFSZKFailoverController对象，并在此之前，通过配置信息，获取一些前置参数，比如：
   * 1）NameNode的命名服务ID：NamenodeNameServiceId；
   * 2）NameNode ID；
   * 3）构造NNHAServiceTarget实例localTarget，NNHAServiceTarget继承自HAServiceTarget，代表一个客户端HA管理命令的目标。
   *       其实就是封装了网络连接的相关地址参数，包括socket地址等；
   * 上述这些在ZooKeeper上注册节点时，是需要通过protobuf序列化后写入的节点数据的。
   */
  public static DFSZKFailoverController create(Configuration conf) {
    // 获取本地NameNode配置信息
    Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
    // 获取该NameNode的命名服务ID：NamenodeNameServiceId
    String nsId = DFSUtil.getNamenodeNameServiceId(conf);

    // 检测是否支持HA，不支持的话直接抛出异常
    if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
      throw new HadoopIllegalArgumentException( "HA is not enabled for this namenode.");
    }
    // 获取NameNode ID，并校验
    String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
    if (nnId == null) {
      String msg = "Could not get the namenode ID of this node. " + "You may run zkfc on the node other than namenode.";
      throw new HadoopIllegalArgumentException(msg);
    }
    // NameNode初始化通用keys
    NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
    DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);

    // 构造NNHAServiceTarget实例localTarget，NNHAServiceTarget继承自HAServiceTarget，代表一个客户端HA管理命令的目标。其实就是封装了网络连接的相关地址参数
    NNHAServiceTarget localTarget = new NNHAServiceTarget( localNNConf, nsId, nnId);
    // 利用本地配置、localTarget构造DFSZKFailoverController
    return new DFSZKFailoverController(localNNConf, localTarget);
  }

  private DFSZKFailoverController(Configuration conf,
      NNHAServiceTarget localTarget) {
    super(conf, localTarget);
    this.localNNTarget = localTarget;
    // Setup ACLs
    adminAcl = new AccessControlList(
        conf.get(DFSConfigKeys.DFS_ADMIN, " "));
    LOG.info("Failover controller configured for NameNode " +
        localTarget);
}
  
  
  @Override
  protected void initRPC() throws IOException {
    super.initRPC();
    localNNTarget.setZkfcPort(rpcServer.getAddress().getPort());
  }

  @Override
  public void loginAsFCUser() throws IOException {
    InetSocketAddress socAddr = NameNode.getAddress(conf);
    SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
        DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
  }
  
  @Override
  protected String getScopeInsideParentNode() {
    return localNNTarget.getNameServiceId();
  }

  /**
   * 解析参数，然后通过静态方法构造一个zkfc实例，调用zkfc的run()方法，执行主业务逻辑。
   */
  public static void main(String args[]) throws Exception {
    //  解析参数
    if (DFSUtil.parseHelpArgument(args,  ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser( new HdfsConfiguration(), args);

    // 核心方法：通过静态方法创建DFSZKFailoverController实例zkfc
    DFSZKFailoverController zkfc = DFSZKFailoverController.create( parser.getConfiguration()); //
    int retCode = 0;
    try {
      // 调用zkfc的run()方法，执行主业务逻辑
      retCode = zkfc.run(parser.getRemainingArgs()); //
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }

  @Override
  protected void checkRpcAdminAccess() throws IOException, AccessControlException {
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    UserGroupInformation zkfcUgi = UserGroupInformation.getLoginUser();
    if (adminAcl.isUserAllowed(ugi) ||
        ugi.getShortUserName().equals(zkfcUgi.getShortUserName())) {
      LOG.info("Allowed RPC access from " + ugi + " at " + Server.getRemoteAddress());
      return;
    }
    String msg = "Disallowed RPC access from " + ugi + " at " +
        Server.getRemoteAddress() + ". Not listed in " + DFSConfigKeys.DFS_ADMIN; 
    LOG.warn(msg);
    throw new AccessControlException(msg);
  }
}
